{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Chicago taxi fare training experience \n",
    "\n",
    "This experiment using Scikit-learn Random Forest to train a ML model on Chicago taxi dataset to estimate taxi trip fare in a given time and start-end locations. Selected approach, feature engineering is based on https://github.com/v-loves-avocados/chicago-taxi data exploration and analysis by [Aradhana Chaturvedi](https://www.linkedin.com/in/aradhana-chaturvedi-b91b8818)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import numpy as np\n",
    "import pandas as pd\n",
    "from pandas_profiling import ProfileReport\n",
    "from scipy import stats\n",
    "\n",
    "from sklearn.ensemble import RandomForestRegressor\n",
    "from sklearn.compose import ColumnTransformer\n",
    "from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV\n",
    "from sklearn.pipeline import Pipeline\n",
    "from sklearn.preprocessing import OneHotEncoder, StandardScaler\n",
    "\n",
    "# MLflow\n",
    "import mlflow\n",
    "import mlflow.sklearn\n",
    "\n",
    "# plotting libraries:\n",
    "import matplotlib.pyplot as plt\n",
    "import matplotlib as mpl\n",
    "import seaborn as sns\n",
    "\n",
    "\n",
    "# Google clients\n",
    "import google.auth\n",
    "from google.cloud import bigquery\n",
    "from google.cloud import bigquery_storage\n",
    "\n",
    "# Set default appearance\n",
    "# - overide maplot libs ugly colours.\n",
    "# - default figure size\n",
    "sns.set(color_codes=True)\n",
    "mpl.rcParams['figure.figsize'] = [13, 8]\n",
    "%matplotlib inline"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "BQ_DATASET = 'chicago_taxi_trips'\n",
    "BQ_TABLE = 'taxi_trips'\n",
    "BQ_QUERY = \"\"\"\n",
    "with tmp_table as (\n",
    "SELECT trip_seconds, trip_miles, fare, tolls, \n",
    "    company, pickup_latitude, pickup_longitude, dropoff_latitude, dropoff_longitude,\n",
    "    DATETIME(trip_start_timestamp, 'America/Chicago') trip_start_timestamp,\n",
    "    DATETIME(trip_end_timestamp, 'America/Chicago') trip_end_timestamp,\n",
    "    CASE WHEN (pickup_community_area IN (56, 64, 76)) OR (dropoff_community_area IN (56, 64, 76)) THEN 1 else 0 END is_airport,\n",
    "FROM `bigquery-public-data.chicago_taxi_trips.taxi_trips`\n",
    "WHERE\n",
    "  dropoff_latitude IS NOT NULL and\n",
    "  dropoff_longitude IS NOT NULL and\n",
    "  pickup_latitude IS NOT NULL and\n",
    "  pickup_longitude IS NOT NULL and\n",
    "  fare > 0 and \n",
    "  trip_miles > 0 and\n",
    "  MOD(ABS(FARM_FINGERPRINT(unique_key)), 100) {}\n",
    "ORDER BY RAND()\n",
    "LIMIT 20000)\n",
    "SELECT *,\n",
    "    EXTRACT(YEAR FROM trip_start_timestamp) trip_start_year,\n",
    "    EXTRACT(MONTH FROM trip_start_timestamp) trip_start_month,\n",
    "    EXTRACT(DAY FROM trip_start_timestamp) trip_start_day,\n",
    "    EXTRACT(HOUR FROM trip_start_timestamp) trip_start_hour,\n",
    "    FORMAT_DATE('%a', DATE(trip_start_timestamp)) trip_start_day_of_week\n",
    "FROM tmp_table\n",
    "\"\"\"\n",
    "\n",
    "# Create BigQuery client\n",
    "credentials, your_project_id = google.auth.default(\n",
    "    scopes=['https://www.googleapis.com/auth/cloud-platform']\n",
    ")\n",
    "bqclient = bigquery.Client(credentials=credentials, project=your_project_id,)\n",
    "bqstorageclient = bigquery_storage.BigQueryReadClient(credentials=credentials)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Query dataset"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df = (\n",
    "    bqclient.query(BQ_QUERY.format('between 0 and 99'))\n",
    "    .result()\n",
    "    .to_dataframe(bqstorage_client=bqstorageclient)\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Column info\n",
    "\n",
    "Watch amount of null values in 'Non-Null Count column'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "display(df.info())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Raw descriptive statistics"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "display(df.describe())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Feature engineering"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def feature_engineering(data):\n",
    "    # Add 'N/A' for missing 'Company'\n",
    "    data.fillna(value={'company':'N/A','tolls':0}, inplace=True)\n",
    "    # Drop rows contains null data.\n",
    "    data.dropna(how='any', axis='rows', inplace=True)\n",
    "    # Pickup and dropoff locations distance\n",
    "    data['abs_distance'] = (np.hypot(data['dropoff_latitude']-data['pickup_latitude'], data['dropoff_longitude']-data['pickup_longitude']))*100\n",
    "\n",
    "    # Remove extremes, outliers\n",
    "    possible_outliers_cols = ['trip_seconds', 'trip_miles', 'fare', 'abs_distance']\n",
    "    data=data[(np.abs(stats.zscore(data[possible_outliers_cols])) < 3).all(axis=1)].copy()\n",
    "    # Reduce location accuracy\n",
    "    data=data.round({'pickup_latitude': 3, 'pickup_longitude': 3, 'dropoff_latitude':3, 'dropoff_longitude':3})\n",
    "    return data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df=feature_engineering(df)\n",
    "display(df.describe())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Remaining null values per column after feature engineering"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(df.isnull().sum().sort_values(ascending=False))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Data profiling\n",
    "\n",
    "(executing the next cell takes long time)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "ProfileReport(df, title='Chicago taxi dataset profiling Report').to_notebook_iframe()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Visual dropoff locations"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "sc = plt.scatter(df.dropoff_longitude, df.dropoff_latitude, c = df['fare'], cmap = 'summer')\n",
    "plt.colorbar(sc)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Location histograms"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "fig, axs = plt.subplots(2)\n",
    "fig.suptitle('Pickup location histograms')\n",
    "df.hist('pickup_longitude', bins=100, ax=axs[0])\n",
    "df.hist('pickup_latitude', bins=100, ax=axs[1])\n",
    "plt.show()\n",
    "\n",
    "fig, axs = plt.subplots(2)\n",
    "fig.suptitle('Dropoff location histograms')\n",
    "df.hist('dropoff_longitude', bins=100, ax=axs[0])\n",
    "df.hist('dropoff_latitude', bins=100, ax=axs[1])\n",
    "plt.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Time based explorations"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Trip start distribution"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "fig, axs = plt.subplots(4)\n",
    "fig.suptitle('Trip start histograms')\n",
    "fig.set_size_inches(18, 12, forward=True)\n",
    "df.hist('trip_start_year', bins=8, ax=axs[0], )\n",
    "df.hist('trip_start_month', bins=12, ax=axs[1])\n",
    "df.hist('trip_start_day', bins=31, ax=axs[2])\n",
    "df.hist('trip_start_hour', bins=24, ax=axs[3])\n",
    "plt.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Trip loginess"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "fig, axs = plt.subplots(2)\n",
    "fig.set_size_inches(18, 8, forward=True)\n",
    "df.hist('trip_miles', bins=50, ax=axs[0])\n",
    "df.hist('trip_seconds', bins=50, ax=axs[1])\n",
    "plt.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Fare by trip start hour"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "display(df.groupby('trip_start_hour')['fare'].mean().plot())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Split dataframe to examples and output"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Drop complex fields and split dataframe to examples and output\n",
    "mlflow.log_param('training_shape', f'{df.shape}')\n",
    "\n",
    "X=df.drop(['trip_start_timestamp'],axis=1)\n",
    "y=df['fare']"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Training pipeline"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# global variables\n",
    "experiment_name = 'chicago-taxi-1'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "ct_pipe = ColumnTransformer(transformers=[\n",
    "    ('hourly_cat', OneHotEncoder(categories=[range(0,24)], sparse = False), ['trip_start_hour']),\n",
    "    ('dow', OneHotEncoder(categories=[['Mon', 'Tue', 'Sun', 'Wed', 'Sat', 'Fri', 'Thu']], sparse = False), ['trip_start_day_of_week']),\n",
    "    ('std_scaler', StandardScaler(), [\n",
    "        'trip_start_year',\n",
    "        'abs_distance',\n",
    "        'pickup_longitude',\n",
    "        'pickup_latitude',\n",
    "        'dropoff_longitude',\n",
    "        'dropoff_latitude',\n",
    "        'trip_miles',\n",
    "        'trip_seconds'])\n",
    "])\n",
    "\n",
    "X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25, random_state=123)\n",
    "X_train=X_train.drop('fare', axis=1)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# for more details: https://scikit-learn.org/stable/modules/generated/sklearn.ensemble.RandomForestRegressor.html\n",
    "rfr_pipe = Pipeline([\n",
    "    ('ct', ct_pipe),\n",
    "    ('forest_reg', RandomForestRegressor(\n",
    "        n_estimators = 20,\n",
    "        max_features = 'auto',\n",
    "        n_jobs = -1,\n",
    "        random_state = 3,\n",
    "        max_depth=None,\n",
    "        max_leaf_nodes=None,\n",
    "    ))\n",
    "])\n",
    "\n",
    "rfr_score = cross_val_score(rfr_pipe, X_train, y_train, scoring = 'neg_mean_squared_error', cv = 5)\n",
    "rfr_rmse = np.sqrt(-rfr_score)\n",
    "rfr_rmse.mean()\n",
    "mlflow.log_metric('train_cross_valid_score_rmse_mean', np.sqrt(-rfr_score).mean())\n",
    "mlflow.log_param('number_of_estimators', 20)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Option 1: Simple training\n",
    "(~fast)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# To see all RandomForestRegressor hyper parameters:\n",
    "# estimator=RandomForestRegressor()\n",
    "# display(estimator.get_params())\n",
    "\n",
    "# Train model\n",
    "mlflow.set_experiment('chicago-taxi-0')\n",
    "# mlflow.sklearn.autolog()\n",
    "with mlflow.start_run(nested=True) as mlflow_run:\n",
    "    final_model=rfr_pipe.fit(X_train, y_train)\n",
    "    mlflow.sklearn.log_model(final_model, 'chicago_rnd_forest')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Option 2: Parameter search + training\n",
    "(time consuming)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "param_grid = {'forest_reg__n_estimators': [5, 250], 'forest_reg__max_features': [6, 16, 'auto']}\n",
    "forest_gs = GridSearchCV(rfr_pipe, param_grid, cv = 5, scoring = 'neg_mean_squared_error', n_jobs = -1)\n",
    "forest_gs.fit(X_train, y_train)\n",
    "print(f'Best parameters: {forest_gs.best_params_}')\n",
    "print(f'Best score: {np.sqrt(-forest_gs.best_score_)}')\n",
    "\n",
    "print(f'(All scores: {np.sqrt(-forest_gs.cv_results_['mean_test_score'])})')\n",
    "\n",
    "final_model=forest_gs.best_estimator_"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Prediction test"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "X_pred = pd.DataFrame(X_test, columns=X_test.columns)\n",
    "X_pred['fare_pred'] = final_model.predict(X_test.drop('fare',axis=1))\n",
    "X_pred.head(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Cross validation score to test set"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "rfr_score = cross_val_score(final_model, X_test, y_test, scoring='neg_mean_squared_error', cv = 5)\n",
    "rfr_rmse = np.sqrt(-rfr_score)\n",
    "rfr_rmse.mean()\n",
    "mlflow.log_metric('eval_cross_valid_score_rmse_mean', np.sqrt(-rfr_score).mean())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Comparer test\n",
    "def model_comparer(job_name, **kwargs):\n",
    "    print(f'Model blessing: \"{job_name}\"')\n",
    "    experiment = mlflow.get_experiment_by_name(experiment_name)\n",
    "    filter_string = f\"tags.job_name ILIKE '{job_name}_%'\"\n",
    "    df = mlflow.search_runs([experiment.experiment_id], filter_string=filter_string)\n",
    "    display(df)\n",
    "    # Compare\n",
    "    # Available columns:\n",
    "    # run_id\texperiment_id\tstatus\tartifact_uri\tstart_time\tend_time\tmetrics.train_cross_valid_score_rmse_mean\tparams.number_of_estimators\ttags.job_name\ttags.mlflow.source.name\ttags.mlflow.user\ttags.mlflow.source.type\ttags.version\n",
    "    eval_max = df.loc[df['metrics.eval_cross_valid_score_rmse_mean'].idxmax()]\n",
    "    train_max= df.loc[df['metrics.train_cross_valid_score_rmse_mean'].idxmax()]\n",
    "\n",
    "    display(eval_max)\n",
    "    return eval_max\n",
    "\n",
    "# You need to set a previous training job name manually. Which is following this naming pattern: training_job_...time stamp...\n",
    "best_run = model_comparer('training_job_20210119T220534')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "\n",
    "client = mlflow.tracking.MlflowClient()\n",
    "\n",
    "def register_model(run_id, model_name):\n",
    "    model_uri = f'runs:/{run_id}/{model_name}'\n",
    "    registered_model = mlflow.register_model(model_uri, model_name)\n",
    "    print(registered_model)\n",
    "\n",
    "registered_models=client.search_registered_models(filter_string=f\"name='{experiment_name}'\", max_results=1, order_by=['timestamp DESC'])\n",
    "if len(registered_models) ==0:\n",
    "    register_model(best_run.run_id, experiment_name)\n",
    "else:\n",
    "    last_version = registered_models[0].latest_versions[0]\n",
    "    run = client.get_run(last_version.run_id)\n",
    "    print(run)\n",
    "    if not run:\n",
    "        print(f'Registered version run missing!')            \n",
    "        \n",
    "\n",
    "    last_eval_metric=run.data.metrics['eval_cross_valid_score_rmse_mean']\n",
    "    best_run_metric=best_run['metrics.eval_cross_valid_score_rmse_mean']\n",
    "    if last_eval_metric<best_run_metric:\n",
    "        print(f'Register better version with metric: {best_run_metric}')\n",
    "        register_model(best_run.run_id, experiment_name)\n",
    "    else:\n",
    "        print(f'Registered version still better. Metric: {last_eval_metric}')\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.8"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
